{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Dressmaker - Hard\n",
    "You may need to create views to complete these questions - but you do not have permission to create tables or views in the default schema. Your SQL commands are executed by user scott in schema gisq - you may create or drop views and tables in schema scott but not in gisq."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Loading spark-stubs, spark-hive\n",
      "Adding Hive conf dir /opt/hive/conf to classpath\n",
      "Creating SparkSession\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "SLF4J: No SLF4J providers were found.\n",
      "SLF4J: Defaulting to no-operation (NOP) logger implementation\n",
      "SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a target=\"_blank\" href=\"http://jupyter:4040\">Spark UI</a>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$                                  \n",
       "\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.types._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.functions._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.expressions.Window\n",
       "\n",
       "\u001b[39m\n",
       "\u001b[36mspark\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@19b92abc"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import $ivy.`org.apache.spark::spark-sql:3.4.0`\n",
    "\n",
    "import org.apache.log4j.{Level, Logger}\n",
    "Logger.getLogger(\"org\").setLevel(Level.OFF)\n",
    "\n",
    "import org.apache.spark._\n",
    "import org.apache.spark.sql._\n",
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.functions._\n",
    "import org.apache.spark.sql.expressions.Window\n",
    "\n",
    "val spark = {\n",
    "    NotebookSparkSession.builder()\n",
    "    .progress(false)\n",
    "    .appName(\"app17-3\")\n",
    "    // .master(\"spark://192.168.31.31:7077\")\n",
    "    .master(\"local[*]\")\n",
    "    .config(\"spark.sql.warehouse.dir\", \n",
    "            \"hdfs://192.168.31.31:9000/user/hive/warehouse\") \n",
    "    .config(\"spark.cores.max\", \"4\") \n",
    "    .config(\"spark.executor.instances\", \"1\") \n",
    "    .config(\"spark.executor.cores\", \"2\") \n",
    "    .config(\"spark.executor.memory\", \"10g\") \n",
    "    .config(\"spark.shuffle.service.enabled\", \"false\") \n",
    "    .config(\"spark.dynamicAllocation.enabled\", \"false\") \n",
    "    .config(\"spark.sql.catalogImplementation\", \"hive\")\n",
    "    .config(\"spark.sql.repl.eagerEval.enabled\", \"true\")\n",
    "    .config(\"spark.driver.allowMultipleContexts\", \"true\")\n",
    "    .getOrCreate()\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36mspark.implicits._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36mspark.sqlContext.implicits._\n",
       "\u001b[39m\n",
       "defined \u001b[32mfunction\u001b[39m \u001b[36msc\u001b[39m\n",
       "\u001b[36mhiveCxt\u001b[39m: \u001b[32msql\u001b[39m.\u001b[32mhive\u001b[39m.\u001b[32mHiveContext\u001b[39m = org.apache.spark.sql.hive.HiveContext@249010a7"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import spark.implicits._\n",
    "import spark.sqlContext.implicits._\n",
    "def sc = spark.sparkContext\n",
    "val hiveCxt = new org.apache.spark.sql.hive.HiveContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined \u001b[32mclass\u001b[39m \u001b[36mRichDF\u001b[39m"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// Credit to Aivean\n",
    "implicit class RichDF(val ds:DataFrame) {\n",
    "    def showHTML(limit: Int = 50, truncate: Int = 100) = {\n",
    "        import xml.Utility.escape\n",
    "        val data = ds.take(limit)\n",
    "        val header = ds.schema.fieldNames.toSeq        \n",
    "        val rows: Seq[Seq[String]] = data.map { row =>\n",
    "          row.toSeq.map {cell =>\n",
    "            val str = cell match {\n",
    "              case null => \"null\"\n",
    "              case binary: Array[Byte] => binary.map(\"%02X\".format(_)).mkString(\"[\", \" \", \"]\")\n",
    "              case array: Array[_] => array.mkString(\"[\", \", \", \"]\")\n",
    "              case seq: Seq[_] => seq.mkString(\"[\", \", \", \"]\")\n",
    "              case _ => cell.toString\n",
    "            }\n",
    "            if (truncate > 0 && str.length > truncate) {\n",
    "              // do not show ellipses for strings shorter than 4 characters.\n",
    "              if (truncate < 4) str.substring(0, truncate)\n",
    "              else str.substring(0, truncate - 3) + \"...\"\n",
    "            } else {\n",
    "              str\n",
    "            }\n",
    "          }: Seq[String]\n",
    "        }\n",
    "    publish.html(s\"\"\" <table>\n",
    "                <tr>\n",
    "                 ${header.map(h => s\"<th>${escape(h)}</th>\").mkString}\n",
    "                </tr>\n",
    "                ${rows.map {row =>\n",
    "                  s\"<tr>${row.map{c => s\"<td>${escape(c)}</td>\" }.mkString}</tr>\"\n",
    "                }.mkString}\n",
    "            </table>\n",
    "        \"\"\")\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[36mjmcust\u001b[39m: \u001b[32mDataFrame\u001b[39m = [c_no: int, c_name: string ... 2 more fields]\n",
       "\u001b[36mdressmaker\u001b[39m: \u001b[32mDataFrame\u001b[39m = [d_no: int, d_name: string ... 2 more fields]\n",
       "\u001b[36mdress_order\u001b[39m: \u001b[32mDataFrame\u001b[39m = [order_no: int, cust_no: int ... 2 more fields]\n",
       "\u001b[36mconstruction\u001b[39m: \u001b[32mDataFrame\u001b[39m = [maker: int, order_ref: int ... 3 more fields]\n",
       "\u001b[36mquantities\u001b[39m: \u001b[32mDataFrame\u001b[39m = [style_q: int, size_q: int ... 1 more field]\n",
       "\u001b[36morder_line\u001b[39m: \u001b[32mDataFrame\u001b[39m = [order_ref: int, line_no: int ... 3 more fields]\n",
       "\u001b[36mgarment\u001b[39m: \u001b[32mDataFrame\u001b[39m = [style_no: int, description: string ... 2 more fields]\n",
       "\u001b[36mmaterial\u001b[39m: \u001b[32mDataFrame\u001b[39m = [material_no: int, fabric: string ... 3 more fields]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val jmcust = hiveCxt.table(\"sqlzoo.jmcust\")\n",
    "val dressmaker = hiveCxt.table(\"sqlzoo.dressmaker\")\n",
    "val dress_order = hiveCxt.table(\"sqlzoo.dress_order\")\n",
    "val construction = hiveCxt.table(\"sqlzoo.construction\")\n",
    "val quantities = hiveCxt.table(\"sqlzoo.quantities\")\n",
    "val order_line = hiveCxt.table(\"sqlzoo.order_line\")\n",
    "val garment = hiveCxt.table(\"sqlzoo.garment\")\n",
    "val material = hiveCxt.table(\"sqlzoo.material\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1.\n",
    "When creating a view in scott you must specify the schema name of the sources and the destination."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2.\n",
    "It is decided to review the materials stock. How much did each material contribute to turnover in 2002?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>material_no</th><th>fabric</th><th>colour</th><th>pattern</th><th>sum(quantity)</th><th>sum(cost)</th>\n",
       "                </tr>\n",
       "                <tr><td>7</td><td>Polyester           </td><td>Pale Yellow         </td><td>Printed             </td><td>4.3</td><td>10.96</td></tr><tr><td>10</td><td>Silk                </td><td>Green Abstract      </td><td>Printed             </td><td>8.3</td><td>124.5</td></tr><tr><td>5</td><td>Cotton              </td><td>Black Dotted        </td><td>Woven               </td><td>6.4</td><td>19.2</td></tr><tr><td>1</td><td>Silk                </td><td>Black               </td><td>Plain               </td><td>4.9</td><td>34.3</td></tr><tr><td>14</td><td>Cotton              </td><td>Green Abstract      </td><td>Printed             </td><td>4.5</td><td>15.75</td></tr><tr><td>11</td><td>Rayon               </td><td>Red/Orange          </td><td>Printed             </td><td>9.7</td><td>38.8</td></tr><tr><td>6</td><td>Cotton              </td><td>Red Stripe          </td><td>Woven               </td><td>2.2</td><td>6.6</td></tr><tr><td>8</td><td>Cotton              </td><td>Blue Stripe         </td><td>Woven               </td><td>4.2</td><td>12.6</td></tr><tr><td>12</td><td>Serge               </td><td>Navy Blue           </td><td>Woven               </td><td>4.5</td><td>24.75</td></tr><tr><td>4</td><td>Cotton              </td><td>Green Stripe        </td><td>Woven               </td><td>2.2</td><td>6.6</td></tr><tr><td>9</td><td>Cotton              </td><td>Pink Check          </td><td>Woven               </td><td>4.6</td><td>13.8</td></tr><tr><td>3</td><td>Cotton              </td><td>Yellow Stripe       </td><td>Woven               </td><td>5.7</td><td>17.1</td></tr><tr><td>13</td><td>Cotton              </td><td>Blue Abstract       </td><td>Printed             </td><td>4.6</td><td>16.1</td></tr><tr><td>2</td><td>Silk                </td><td>Red Abstract        </td><td>Printed             </td><td>9.3</td><td>93.0</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(material\n",
    " .join(order_line, (material(\"material_no\") === order_line(\"ol_material\")))\n",
    " .join(quantities, ((order_line(\"ol_size\") === quantities(\"size_q\")) &&\n",
    "                       (order_line(\"ol_style\") === quantities(\"style_q\"))))\n",
    " .join(dress_order.filter(year(dress_order(\"order_date\")) === 2002),\n",
    "       (order_line(\"order_ref\") === dress_order(\"order_no\")))\n",
    " .withColumn(\"cost\", col(\"cost\") * col(\"quantity\"))\n",
    " .groupBy(\"material_no\", \"fabric\", \"colour\", \"pattern\")\n",
    " .agg(sum(\"quantity\"), sum(\"cost\"))\n",
    " .withColumn(\"sum(cost)\", round($\"sum(cost)\", 2))\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3.\n",
    "An order for shorts has just been placed and the work is to be distributed amongst the workforce, and we wish to know how busy the shorts makers are. For each of the workers who have experience of making shorts show the number of hours work that she is currently committed to, assuming a meagre wage of £4.50 per hour"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>d_name</th><th>sum(hrs)</th>\n",
       "                </tr>\n",
       "                <tr><td>Mr Seam             </td><td>28.17</td></tr><tr><td>Ms Sew              </td><td>18.61</td></tr><tr><td>Miss Stitch         </td><td>49.17</td></tr><tr><td>Miss Pins           </td><td>28.17</td></tr><tr><td>Mr Needles          </td><td>18.61</td></tr><tr><td>Mr Taylor           </td><td>18.61</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(dressmaker\n",
    " .join(dressmaker\n",
    "        .join(construction, (dressmaker(\"d_no\") === construction(\"maker\")))\n",
    "        .join(dress_order, (construction(\"order_ref\") === dress_order(\"order_no\")))\n",
    "        .join(order_line, (dress_order(\"order_no\") === order_line(\"order_ref\")))\n",
    "        .join(garment.filter(lower(trim(garment(\"description\"))) === \"shorts\"), \n",
    "              (order_line(\"ol_style\") === garment(\"style_no\")))\n",
    "        .select(\"d_no\")\n",
    "        .distinct().alias(\"shorts\"),\n",
    "       \"d_no\")\n",
    " .join(construction, (dressmaker(\"d_no\") === construction(\"maker\")))\n",
    " .join(dress_order.filter(dress_order(\"completed\") === \"N\"), \n",
    "       (col(\"order_ref\") === dress_order(\"order_no\")))\n",
    " .join(order_line, (col(\"order_no\") === order_line(\"order_ref\")))\n",
    " .join(garment.withColumn(\"hrs\", garment(\"labour_cost\")/4.5),  \n",
    "       (col(\"ol_style\") === garment(\"style_no\")))\n",
    " .join(quantities, ((col(\"ol_size\") === quantities(\"size_q\")) &&\n",
    "                    (col(\"ol_style\") === quantities(\"style_q\"))))\n",
    " .groupBy(\"d_name\")\n",
    " .sum(\"hrs\")\n",
    " .withColumn(\"sum(hrs)\", round($\"sum(hrs)\", 2))\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4.\n",
    "\"Big spender of the year\" is the customer who spends the most on high value items. Identify the \"Big spender of the year 2002\" if the \"high value\" threshold is set at £30. Also who would it be if the threshold was £20 or £50?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>c_name</th><th>tot_cost</th><th>thres</th>\n",
       "                </tr>\n",
       "                <tr><td>Mr Brass            </td><td>198.54</td><td>20</td></tr><tr><td>Ms White            </td><td>173.55</td><td>30</td></tr><tr><td>Mr Brass            </td><td>72.0</td><td>50</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[36mt\u001b[39m: \u001b[32mDataFrame\u001b[39m = [order_ref: int, line_no: int ... 16 more fields]\n",
       "\u001b[36mc\u001b[39m: \u001b[32mDataFrame\u001b[39m = [order_no: int, cust_no: int ... 9 more fields]\n",
       "\u001b[36mlst\u001b[39m: \u001b[32mList\u001b[39m[\u001b[32mDataFrame\u001b[39m] = \u001b[33mList\u001b[39m(\n",
       "  [c_name: string, tot_cost: double ... 1 more field],\n",
       "  [c_name: string, tot_cost: double ... 1 more field],\n",
       "  [c_name: string, tot_cost: double ... 1 more field]\n",
       ")"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val t = (order_line\n",
    "     .join(quantities, ((order_line(\"ol_style\") === quantities(\"style_q\")) && \n",
    "                        (order_line(\"ol_size\") === quantities(\"size_q\"))))\n",
    "     .join(garment, (order_line(\"ol_style\") === garment(\"style_no\")))\n",
    "     .join(material, (order_line(\"ol_material\") === material(\"material_no\")))\n",
    "     .withColumn(\"tot_cost\", col(\"labour_cost\") + col(\"quantity\") * col(\"cost\")))\n",
    "\n",
    "val c = (dress_order.filter(year(dress_order(\"order_date\")) === 2002)\n",
    "     .join(jmcust, (dress_order(\"cust_no\") === jmcust(\"c_no\")))\n",
    "     .join(t.select(\"order_ref\", \"line_no\", \"tot_cost\"), \n",
    "           (col(\"order_no\") === t(\"order_ref\"))))\n",
    "\n",
    "val lst = List(20, 30, 50).map(thres => (\n",
    "    c.filter($\"tot_cost\" >= thres)\n",
    "     .groupBy(\"c_name\")\n",
    "     .agg(sum(\"tot_cost\").alias(\"tot_cost\"))\n",
    "     .orderBy(col(\"tot_cost\").desc)\n",
    "     .limit(1)\n",
    "     .select(\"c_name\", \"tot_cost\")\n",
    "     .withColumn(\"tot_cost\", round($\"tot_cost\", 2))\n",
    "     .withColumn(\"thres\", lit(thres))))\n",
    "\n",
    "lst.reduce(_.union(_)).showHTML()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5.\n",
    "Who is the fastest at making trousers?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>d_no</th><th>d_name</th><th>days</th>\n",
       "                </tr>\n",
       "                <tr><td>3</td><td>Mr Needles          </td><td>3</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(dressmaker.join(construction, (dressmaker(\"d_no\") === construction(\"maker\")))\n",
    " .join(order_line, ((construction(\"order_ref\") === order_line(\"order_ref\")) && \n",
    "                    (construction(\"line_ref\") === order_line(\"line_no\"))))\n",
    " .join(garment.filter(lower(trim(garment(\"description\"))) === \"trousers\"), \n",
    "       (order_line(\"ol_style\") === garment(\"style_no\")))\n",
    " .withColumn(\"days\", (to_date(col(\"finish_date\"))-to_date(col(\"start_date\"))).cast(\"integer\"))\n",
    " .select(\"d_no\", \"d_name\", \"days\")\n",
    " .na.drop(Array(\"days\"))\n",
    " .orderBy(\"days\")\n",
    " .limit(1)\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6.\n",
    "\"Employee of the month\" is the seamstress who completes the greatest value of clothes. Show the \"employees of the month\" for months in 2002."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>d_name</th><th>month</th><th>val</th>\n",
       "                </tr>\n",
       "                <tr><td>Miss Stitch         </td><td>1</td><td>49.0</td></tr><tr><td>Mrs Hem             </td><td>2</td><td>122.25</td></tr><tr><td>Miss Stitch         </td><td>3</td><td>97.2</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(material\n",
    " .join(order_line, (material(\"material_no\") === order_line(\"ol_material\")))\n",
    " .join(quantities, ((order_line(\"ol_style\") === quantities(\"style_q\")) && \n",
    "                    (order_line(\"ol_size\") === quantities(\"size_q\"))))\n",
    " .join(garment, (order_line(\"ol_style\") === garment(\"style_no\")))\n",
    " .join(construction.filter(year(to_date(construction(\"start_date\"))) === 2002), \n",
    "       ((construction(\"order_ref\") === order_line(\"order_ref\")) && \n",
    "           (construction(\"line_ref\") === order_line(\"line_no\"))))\n",
    " .join(dressmaker, (construction(\"maker\") === dressmaker(\"d_no\")))\n",
    " .withColumn(\"month\", month(to_date(col(\"start_date\"))))\n",
    " .withColumn(\"val\", col(\"quantity\") * col(\"cost\") + col(\"labour_cost\"))\n",
    " .groupBy(\"d_name\", \"month\")\n",
    " .agg(sum(\"val\").alias(\"val\"))\n",
    " .withColumn(\"sn\", rank().over(\n",
    "     Window.partitionBy(\"month\").orderBy(col(\"val\").desc)))\n",
    " .filter(col(\"sn\") === 1)\n",
    " .select(\"d_name\", \"month\", \"val\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".sc",
   "mimetype": "text/x-scala",
   "name": "scala",
   "nbconvert_exporter": "script",
   "version": "2.12.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
